package com.dec.kks.etl;

import com.dec.kks.etl.kafka.KafkaOffsetManager;
import com.dec.kks.etl.loader.FinalEntry;
import com.dec.kks.etl.producer.DECConfigUtil;
import com.dec.kks.etl.producer.KKSEntity;
import com.dec.kks.etl.producer.PacketData;
import com.google.gson.Gson;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka010.*;

import java.util.*;

public class DECHistoryLoaderByHourMain {

    public static void main(String[] args) throws Exception {
        SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("dec-kks-etl");
        conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
        JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(1));
        Map<String, Object> kafkaParams = DECConfigUtil.loadConfig("/home/hdfs/soft/IdeaProjects/dec-kks-etl/src/main/resources", 0);

        Collection<String> topics = Arrays.asList("spark-topic-streaming");

        // 读取上次偏移量
        Map<TopicPartition, Long> offset_map = new HashMap<TopicPartition, Long>();
        TopicPartition tp = new TopicPartition("rds-dealed-t00101", 0);

        // 获取offset
        long offset = KafkaOffsetManager.queryOffset(tp);
        offset_map.put(tp, offset);

        JavaInputDStream<ConsumerRecord<String, String>> stream =
                KafkaUtils.createDirectStream(
                        jssc,
                        LocationStrategies.PreferConsistent(),
                        ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams, offset_map)
                );

        stream.foreachRDD(new VoidFunction<JavaRDD<ConsumerRecord<String, String>>>() {
            @Override
            public void call(JavaRDD<ConsumerRecord<String, String>> consumerRecordJavaRDD) throws Exception {
                if (consumerRecordJavaRDD != null) {
                    // 事物开始
                    OffsetRange[] offsetRanges = ((HasOffsetRanges) consumerRecordJavaRDD.rdd()).offsetRanges();

                    // 数据解析
                    JavaRDD<FinalEntry> hiveRDD = consumerRecordJavaRDD.coalesce(1).flatMap(new FlatMapFunction<ConsumerRecord<String, String>, FinalEntry>() {
                        @Override
                        public Iterator<FinalEntry> call(ConsumerRecord<String, String> record) throws Exception {
                            String value = record.value();
                            if (StringUtils.isNoneBlank(value)) {
                                Gson gson = new Gson();
                                KKSEntity input = gson.fromJson(value, KKSEntity.class);

                                // 提取日期
                                long ct = input.getCollect_time() * 1000l;
                                String date_yMd = DateFormatUtils.format(ct, "yyyy-MM-dd");
                                // 提取小时
                                String date_H = DateFormatUtils.format(ct, "HH");
                                // 提取电厂类型
                                String pot = input.getPower_type();
                                // 提取电厂编码
                                String poc = input.getPower_code();
                                // 组装电厂类型+编码
                                String pc = pot+poc;
                                // 提取机组
                                String puc = input.getPower_unit_code();
                                // 提取测点数据
                                Map<String, PacketData> data = input.getPacketed_data();
                                Iterator<Map.Entry<String, PacketData>> it_kks = data.entrySet().iterator();
                                List<FinalEntry> entries = new ArrayList<FinalEntry>();
                                FinalEntry finalEntry = null;
                                while (it_kks.hasNext()){
                                    finalEntry = new FinalEntry(pc,puc,date_yMd,date_H);
                                    Map.Entry<String, PacketData> entry = it_kks.next();
                                    String substdCode = entry.getKey();
                                    PacketData pd = entry.getValue();
                                    finalEntry.setSrcCode(pd.getSrc_code());
                                    finalEntry.setSrcDataQuality(pd.getSrc_data_quality());
                                    finalEntry.setStdDataQuality(pd.getStd_data_quality());
//                                    finalEntry.set

                                }



                            }
                            return null;
                        }
                    });

                    // 数据组装

                    // 数据存储

                    // 事物结束
                    KafkaOffsetManager.saveOffset(offsetRanges);

                }
            }
        });

//        JavaDStream<String> hiveDS = stream.flatMap(new FlatMapFunction<ConsumerRecord<String, String>, String>() {
//            @Override
//            public Iterator<String> call(ConsumerRecord<String, String> sscr) throws Exception {
//                OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
//                List<String> list = new LinkedList<String>();
//                System.out.println("kafka消息：->" + sscr.key() + "-:-" + sscr.value());
//                list.add(sscr.value());
//                return list.iterator();
//            }
//        });


        jssc.start();
        jssc.awaitTermination();
    }
}
